package org.idea.qiyu.threadpool.demo.asynchandle;

import org.springframework.stereotype.Component;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.AtomicLong;

/**
 * CommandLineRunner 这个接口是会处于一个单线程堵塞情况的，所以需要注意使用场景
 * 建议可以在启动之后开启一些daemon线程来触发某些任务的执行。
 *
 * @Author linhao
 * @Date created in 10:08 上午 2021/10/17
 */
@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{

    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);

    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }

    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }

    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {

        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);

        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }

        private TaskDispatcherHandler[] taskDispatcherHandlers;

        private int childConsumerSize = 0;

        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }

        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }


        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class TaskDispatcherHandler implements Runnable {

        private BlockingQueue<AsyncHandlerData> subTaskQueue;

        private String childName;

        private AtomicLong taskCount = new AtomicLong(0);

        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }

        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }

        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理：" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理：" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
